<?php
namespace Swork\Client;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Swork\Exception\AmqpException;
use Swork\Pool\Amqp\AmqpConfig;
use Swork\Pool\ConnectionInterface;
use Swork\Pool\PoolCollector;
use Swork\Pool\PoolInterface;
use Swork\Service;

class Amqp
{
    /**
     * @var PoolInterface
     */
    private $pool;

    /**
     * 队列名称
     * @var string
     */
    private $queueName;

    /**
     * 是否已经声明
     * @var bool
     */
    private $declare = false;

    /**
     * 内存表
     * @var \swoole_table
     */
    private $table = null;

    /**
     * 初始化获取连接池
     * Redis constructor
     */
    public function __construct()
    {
        $this->pool = PoolCollector::getCollector(PoolCollector::Amqp, 'default');
        $this->table = Service::$server->table;
    }

    /**
     * 设置参数（公共入口）
     * @param mixed $value 参数化入口（可能是string、array）
     */
    public function setParams($value)
    {
        if (is_string($value))
        {
            $this->setQueueName($value);
        }
        else if (is_array($value) && isset($value['name']))
        {
            $this->setQueueName($value['name']);
        }
    }

    /**
     * 设置队列名
     * @param string $queueName
     */
    public function setQueueName(string $queueName)
    {
        $this->queueName = $queueName;
    }

    /**
     * 获取队列名称
     * @return string
     */
    public function getQueueName()
    {
        return $this->queueName;
    }

    /**
     * 获取连接池
     * @return bool|PoolInterface
     */
    public function getPool()
    {
        return $this->pool;
    }

    /**
     * 获取amqp连接
     * @param ConnectionInterface $conn
     * @return AMQPStreamConnection|false
     */
    public function getConnection(ConnectionInterface $conn)
    {
        return $conn->getConnection();
    }

    /**
     * 获取配置
     * @return AmqpConfig
     */
    public function getConfig()
    {
        return $this->pool->getConfig();
    }

    /**
     * 释放连接
     * @param ConnectionInterface $conn
     */
    public function releaseConnection(ConnectionInterface $conn)
    {
        $this->pool->releaseConnection($conn);
    }

    /**
     * 基础方式投递队列消息
     * @param string|array $body 消息内容
     * @param array $args 额外参数
     * @throws
     */
    public function basePublish($body, array $args = [])
    {
        //初始化
        if (empty($this->queueName))
        {
            throw new AmqpException('no queue name defined', 5601);
        }

        //格式化消息
        if (is_array($body))
        {
            $body = json_encode($body, JSON_UNESCAPED_UNICODE);
        }
        $message = new AMQPMessage($body, $args);

        //内存数据记录KEY
        $mkey = md5($this->queueName . $body);

        //投递消息
        $conn = null;
        $channel = null;
        $connection = null;
        try
        {
            //获取连接
            $conn = $this->pool->getConnection();
            $connection = $this->getConnection($conn);
            if ($connection == false)
            {
                $conn->reconnect();
                $connection = $this->getConnection($conn);
            }
            if ($connection == false)
            {
                throw new AmqpException('no connection fetched', 5602);
            }

            //获取通道
            $channel = $connection->channel();
            if ($channel == false)
            {
                throw new AmqpException('no channel fetched', 5603);
            }

            //声明队列
            if ($this->declare == false)
            {
                $channel->queue_declare($this->queueName, false, true, false, false, false);
                $this->declare = true;
            }

            //投递
            $channel->basic_publish($message, '', $this->queueName);

            //删除次数错误
            if ($this->table->exist($mkey))
            {
                $this->table->del($mkey);
            }
        }
        catch (\Throwable $ex)
        {
            //错误代码
            $code = $ex->getCode();
            if ($code == 5601)
            {
                throw $ex;
            }
            if ($code == 104 || $code == 32)
            {
                Service::$logger->error('AMQP basePublish reconnect - ' . $code);
                $conn->reconnect();
            }

            //记录错误次数
            $retry = $this->getConfig()->getPubRetry();
            $tried = $this->table->incr($mkey, 'num');
            if ($tried > $retry)
            {
                $this->table->del($mkey);
                throw $ex;
            }

            //打印错误
            Service::$logger->error('AMQP basePublish try [' . $tried . '] [' . $ex->getMessage() . ' - ' . $code . ']');

            //继续重试
            $this->basePublish($body, $args);
        }
        finally
        {
            if ($channel != false)
            {
                $channel->close();
            }
            $this->releaseConnection($conn);
        }
    }
}
